Step FunctionsステートマシンからDynamoDBテーブルに対して26件以上のUpdateItem操作をしたい(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsステートマシンからAmazon DynamoDBテーブルに対して26件以上のUpdateItem操作を行う構成をAWS CDKで作成してみました。
TransactWriteItemsで一度に更新処理できる件数は最大25件
以前の下記エントリでは、ステートマシンからDynamoDBテーブル上の複数アイテムのUpdateItemをTransactWriteItemsを使って行いました。
しかしこのTransactWriteItemsは一度に更新処理できる件数は最大25件という制限があります。
TransactWriteItems is a synchronous write operation that groups up to 25 action requests.
そこで今回は、ステートマシンから26件以上のUpdateItem操作を、TransactWriteItems操作をMapステートでループ実行する構成で実装してみます。
やってみた
CDKコード
import * as cdk from '@aws-cdk/core'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as tasks from '@aws-cdk/aws-stepfunctions-tasks'; import * as dynamodb from '@aws-cdk/aws-dynamodb'; export class AwsCdkAppStack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); //UpdateItem対象のDynamoDBテーブル const deviceTable = new dynamodb.Table(this, 'deviceTable', { tableName: 'deviceTable', partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING }, billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, }); //transactWriteItemsアクション const updateDeviceData = new tasks.CallAwsService( this, 'updateDeviceData', { service: 'dynamodb', action: 'transactWriteItems', parameters: { 'TransactItems.$': '$.transactItems', }, iamResources: [deviceTable.tableArn], iamAction: 'dynamodb:*', } ); //Mapステート const updateDeviceDataMap = new sfn.Map(this, 'updateDeviceDataMap', { itemsPath: sfn.JsonPath.stringAt('$.mapItems'), parameters: { 'transactItems.$': '$$.Map.Item.Value.TransactItems', }, }); updateDeviceDataMap.iterator(updateDeviceData); //ステートマシン new sfn.StateMachine(this, 'updateDeviceDataStateMachine', { stateMachineName: 'updateDeviceDataStateMachine', definition: updateDeviceDataMap, }); } }
cdk deploy
でデプロイします。
すると下記のような定義のステートマシンが作成されます。
{ "StartAt": "updateDeviceDataMap", "States": { "updateDeviceDataMap": { "Type": "Map", "End": true, "Parameters": { "transactItems.$": "$$.Map.Item.Value.TransactItems" }, "Iterator": { "StartAt": "updateDeviceData", "States": { "updateDeviceData": { "End": true, "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:dynamodb:transactWriteItems", "Parameters": { "TransactItems.$": "$.transactItems" } } } }, "ItemsPath": "$.mapItems" } } }
動作
以下のようなペイロードを入力にしてステートマシンを実行します。1件目のTransactItemsは25件、2件目のTransactItemsは2件のUpdateItem操作を指定しています。(大きいので折りたたんでいます。)
入力
{ "mapItems": [ { "TransactItems": [ { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d001" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d002" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d003" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス003" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d004" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス004" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d005" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d006" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d007" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス007" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d008" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス008" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d009" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d010" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d011" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス011" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d012" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス012" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d013" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d014" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d015" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス015" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d016" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス016" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d017" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d018" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d019" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス019" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d020" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス020" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d021" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d022" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d023" } }, "ExpressionAttributeValues": { ":temperature": { "N": "15" }, ":deviceName": { "S": "デバイス023" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d024" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス024" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d025" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } } ] }, { "TransactItems": [ { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d026" } }, "ExpressionAttributeValues": { ":temperature": { "N": "10" } }, "ExpressionAttributeNames": { "#temperature": "temperature" }, "UpdateExpression": "SET #temperature = :temperature" } }, { "Update": { "TableName": "deviceTable", "Key": { "deviceId": { "S": "d027" } }, "ExpressionAttributeValues": { ":temperature": { "N": "30" }, ":deviceName": { "S": "デバイス027" } }, "ExpressionAttributeNames": { "#deviceName": "deviceName", "#temperature": "temperature" }, "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature" } } ] } ] }
テーブルのアイテム一覧を見ると、26件以上のアイテムがUpdateItemできています。
制限
1件のTransactItemsで指定可能な操作件数は25件まで
今回回避しようとした制限です。1件のTransactItemsで26件以上を指定した場合は以下のようなエラーによりMapイテレーションの実行が失敗します。
at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 25 (Service: DynamoDb, Status Code: 400, Request ID: 378b1607-ed9b-4201-a4a3-4932081c51fd, Extended Request ID: null)"
ステートマシンの最大入出力サイズは256KB
ステートマシンのタスク、状態や実行の最大入出力サイズは256KB(262,144バイト)です。
256KBを超えるサイズのデータを扱いたい場合は、S3バケットにデータを一時保存するなどして256KB内に収めるようにします。
ちなみに今回の動作確認で使用した入力データは約14KBでした。
参考
以上